package com.xinyue.mqsystem.event;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.core.env.Environment;

import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.mygame.common.utils.GameUUIDUtil;
import com.xinyue.mqsystem.mq.GameMQTemplate;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

public class GameMQEventContext {
	private static String topic = "game-rocketmq-event-syste-topic";
	private static GameMQTemplate gameMQTemplate;
	private static String consumeGroupName;
	private static ApplicationContext applicationContext;
	private static Logger logger = LoggerFactory.getLogger(GameMQEventContext.class);
	private static Map<Integer, Class<? extends IEventMessage>> eventMessageMap = new HashMap<>();

	private static Map<Integer, List<GameEventDispatcherInfo>> gameEventDispatcherMap = new HashMap<>();
	private static int localServerId;//本地serverId，用于记录事件是从哪个服务发出的
	
	public static void init(int localServerId,ApplicationContext applicationContext) {
		init(localServerId,null, applicationContext);
	}

	/**
	 * 
	 * @Desc 这个方法可以手动设置消费者组，如果groupName为null，将使用spring.application.name做为消费者组参数
	 * @Author wang guang shuai
	 * @Date 2020年3月22日 下午4:44:22
	 * @param groupName
	 * @param gameMQTemplate
	 * @param applicationContext
	 */
	public static void init(int localServerId,String groupName,ApplicationContext applicationContext) {
		GameMQEventContext.localServerId = localServerId;
		GameMQEventContext.gameMQTemplate = applicationContext.getBean(GameMQTemplate.class);
		GameMQEventContext.consumeGroupName = groupName;
		GameMQEventContext.applicationContext = applicationContext;
		NacosDiscoveryProperties properties = applicationContext.getBean(NacosDiscoveryProperties.class);
		topic = properties.getNamespace() + "-" + topic;
		scanGameEvent();
		consumeEventMessage();

	}

	private static void scanGameEvent() {
		String[] beanNames = applicationContext.getBeanNamesForAnnotation(GameEventHandler.class);
		for (String beanName : beanNames) {
			Object bean = applicationContext.getBean(beanName);
			Method[] methods = bean.getClass().getMethods();
			for (Method method : methods) {
				addEventDispatcherInfo(bean, method);
			}
		}
	}

	/**
	 * 
	 * <p>
	 * Description: 获取所有的本服务要处理的eventId
	 * </p>
	 * 
	 * @return
	 * @author wang guang shuai
	 * @date 2020年3月17日 下午2:16:46
	 *
	 */
	private static Collection<Integer> getAllEventIds() {
		return eventMessageMap.keySet();
	}

	private static void addEventDispatcherInfo(Object target, Method method) {
		GameEventMapping gameEventMapping = method.getAnnotation(GameEventMapping.class);
		if (gameEventMapping != null) {
			GameEventMetadata gameEventMetadata = gameEventMapping.value().getAnnotation(GameEventMetadata.class);
			if (gameEventMetadata != null) {
				int eventId = gameEventMetadata.eventId();
				List<GameEventDispatcherInfo> gameEventDispatcherInfos = gameEventDispatcherMap.get(eventId);
				if (gameEventDispatcherInfos == null) {
					gameEventDispatcherInfos = new ArrayList<>();
					gameEventDispatcherMap.put(eventId, gameEventDispatcherInfos);
				}
				if (eventMessageMap.containsKey(eventId)) {
					Class<? extends IEventMessage> oldClazz = eventMessageMap.get(eventId);
					if (oldClazz.getName().equals(gameEventMapping.value().getName())) {
						throw new IllegalArgumentException("事件对象 " + oldClazz.getName() + ","
								+ gameEventMapping.value().getName() + " eventId重复！！");
					}
				} else {
					eventMessageMap.put(eventId, gameEventMapping.value());
				}
				GameEventDispatcherInfo gameEventDispatcherInfo = new GameEventDispatcherInfo(target, method);
				gameEventDispatcherInfos.add(gameEventDispatcherInfo);
			} else {
				throw new IllegalArgumentException(
						"MQ 事件对象没有添加元数据信息,请在事件对象上添加@GameEventMetadata信息，事件对象：" + gameEventMapping.value().getName());
			}
		}
	}

	private static IEventMessage getEventMessageInstance(int eventId) {
		Class<? extends IEventMessage> clazz = eventMessageMap.get(eventId);
		if (clazz != null) {
			try {
				Object instance = clazz.newInstance();
				return (IEventMessage) instance;
			} catch (InstantiationException | IllegalAccessException e) {
				logger.error("创建EventMessage实例失败,eventId:{}", eventId, e);
			}
		}
		return null;
	}

	public static void sendEvent(IEventMessage eventMessage) {
		String id = GameUUIDUtil.getUId();
		eventMessage.getHeader().setId(id);
		eventMessage.getHeader().setFromServerId(localServerId);
		byte[] data = eventMessage.write();
		String tag = String.valueOf(eventMessage.getHeader().getEventId());
		List<String> tags = Arrays.asList(tag);
		gameMQTemplate.asyncSendOrderly(topic, data, tags);
		logger.debug("发送MQ事件，topic:{},event:{},tags:{}", topic, eventMessage, tag);
	}

	private static void dispatcherEvent(IEventMessage gaMessage) {
		EventMessageHeader header = gaMessage.getHeader();
		List<GameEventDispatcherInfo> gameEventDispatcherInfos = gameEventDispatcherMap.get(header.getEventId());
		if (gameEventDispatcherInfos != null) {
			gameEventDispatcherInfos.forEach(c -> {
				try {
					c.getMethod().invoke(c.getTarget(), gaMessage);
				} catch (Throwable e) {
					logger.error("Game event call {}#{}失败", c.getTarget().getClass().getName(), c.getMethod().getName(),
							e);
				}
			});
		}
	}

	private static void consumeEventMessage() {
		Environment environment = applicationContext.getEnvironment();
		RocketMQProperties rocketMQProperties = applicationContext.getBean(RocketMQProperties.class);
		// 设置消息者组，同一组下面监听的topic只会被消费一次，防止多次消费
		if (consumeGroupName == null || consumeGroupName.length() == 0) {
			consumeGroupName = environment.getProperty("spring.application.name") + "-EventConsumerGroup";
		}
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumeGroupName);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.setMessageModel(MessageModel.CLUSTERING);
		consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
		Collection<Integer> eventIds = getAllEventIds();
		if (eventIds.size() > 0) {
			Collection<String> tags = eventIds.stream().map(c -> String.valueOf(c)).collect(Collectors.toList());
			// 选择消息，只接收这些tags的消息
			String subExpression = GameMQTemplate.convertTags(tags);
			try {
				consumer.subscribe(topic, subExpression);
				//添加消费监听
				consumer.registerMessageListener(new MessageListenerOrderly() {

					@Override
					public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
						byte[] bytes=null;
						for(MessageExt message : msgs) {
							bytes = message.getBody();
							if(bytes != null) {
								ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
								int eventId = byteBuf.readInt();
								IEventMessage eventMessage = getEventMessageInstance(eventId);
								try {
									eventMessage.read(byteBuf);
									//分发消息，注意在这里是在消费者线程中执行的。
									dispatcherEvent(eventMessage);
								} catch (Exception e) {
									logger.error("MQ事件处理异常，eventId:{}",eventId,e);
								}
							}
						}
						return ConsumeOrderlyStatus.SUCCESS;
					}
				});
				consumer.start();
			} catch (MQClientException e) {
				logger.error("启动事件RocketMQ消费者失败", e);
			}
			logger.info("MQ 事件系统启动成功,GroupName:{},topic:{},tags：{}", consumeGroupName, topic, subExpression);
		}

	}
	

}
